|
162 |
|
Полезный ресурс: https://www.youtube.com/watch?v=G6ipydgZRnE
Содержание
- Что такое
DAG
- Что такое
task
- Структура
DAG
-файла - Запуск первого
DAG
-файла вAirflow
- Подключение Python-функций
- Использование нескольких
DAG
-ов в одном файле - Обмен сообщений через
XCOM
- Как сохранять данные в
XCOM
- Как читать данные из
XCOM
- Как работает передача данных через
XCOM
- Как работает передача данных через
PythonOperator
- способы сохранения и чтения результата вXCOM
- Передача одного значения с использованием
return
- Передача нескольких значений с использованием
return
- Передача одного значения с использованием
XCOM_push
- Передача нескольких значений с использованием
XCOM_push
- Способы доступа к значениям
XCOM
- Передача одного значения с использованием
BashOperator
- способы сохранения и чтения результата вXCOM
- Как сохранять данные в
Что такое DAG
DAG
-и (Directed Acyclic Graph
) используются в Airflow
(платформа, построенная на Python
), поэтому и DAG
-и - это Python
-код.
Airflow
- это best practice
для процесса ETL
:
- extract data
- transform data
- load data
Соответственно обработку данных мы будем проводить через DAG
-и. Внутри DAG
-а получение, обработка данных идёт через задачи (таски):
task 2
/ \
task 1 task 4 - task 5
\ /
task 3
В рамках одного дага таски могут иметь абослютно разные зависимости, последовательность выполнения. В рамках каждой задачи выполняется какое-либо специфичное действие, напр.:
task 1 - получение информации из базы данных
task 2 - фильтрация информации по одним критериям и сохранение в каком-либо источнике
task 3 - фильтрация данных по другим критериям
task 4 - сохранение данных по полученным критериям из разных задач
task 5 - сохранение тех же данных, но в другом формате и в другом месте
Тем самым мы получаем выстроенный пайплайн, основанный на DAG
-е.
Что такое task
Все промежуточные действия внутри DAG
-а реализуются на уровне задач (тасок / tasks).
Работа внутри тасок держится на операторах. Операторы бывают разных типов:
airflow.operators.bash
- оператор для запуска bash-командairflow.operators.branch
- для ветвленияairflow.operators.datetime
airflow.operators.empty
airflow.operators.email
- опреатор для отправки e-mail-аairflow.operators.generic_transfer
airflow.operators.latest_only
- перезапуск одной последней задачиairflow.operators.python
- произвольный python-кодairflow.operators.subdag
airflow.operators.trigger_dugrun
- PostgresOperator - оператор для вызова sql-запросов в PostgreSQL БД
Также таски могут представлять из себя сенсоры:
PythonSensor
- ждём, когда функция вернёт TrueRedisKeySensor
- проверяет, существует ли переданный ключ в Redis хранилищеS3Sensor
- проверяет наличие объекта по ключу в S3-бакетеRedisPubSubSensor
- проверяет наличие сообщения в pub-sub очереди
Структура DAG
-файла
Импортируем модуль DAG
:
from airflow import DAG
Описание DAG-а
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
|
Разбор DAG
-файла
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
|
'owner': 'dao2',
: отображение в UI Airflow, кому принадлежит dag'start_date'
: самая ранняя дата анализа дагом, как далеко смотреть "назад в прошлое" при запускеDag
-а'provide_context': True
: передавать контекст во всех задачах"First-Dag-Name"
илиdag_id="First-Dag-Name"
: имя, под которым в UI будет отображаться этот Dag- словарь с настройками
args
мы передаём в наш даг через переменнуюdefault_args
. schedule_interval='*/1 * * * *'
: выполнять каждую минутуcatchup=False
: игнорировать настройку start_date и начинать с текущего момента#start_date=pendulum.datetime(2023, 1, 1, tz="UTC")
: закомментирован, так как настройки start_date у нас уже прописаны в args
Создаём первый полноценный работающий DAG
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
|
Запуск первого DAG
-файла в Airflow
Чтобы запустить DAG
, сначала требуется настроить окружение. Установить Airflow
локально можно несколькими способами:
- docker
- docker-compose
- pip
В статье показана установка через pip.
Подключение Python-функций
Ранее уже был показан пример синтаксиса, но он может отличаться, есть разные варианты написания DAG
-а на уровне Python.
Один из таких примеров приведён ниже:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
|
Как видно через Python-операторы и переменную python_callable
мы передаём функцию, которая объявлена выше в коде.
Также можно сначала импортировать функцию из другого файла и обращаться к функции, объявленной в импорте.
В вызываемую через python_callable
функции можно обращаться к переменным, указанным в данном даге. А именно:
[2024-01-26, 20:18:01 UTC] {logging_mixin.py:188} INFO - some job done
[2024-01-26, 20:18:01 UTC] {logging_mixin.py:188} INFO - dag details: <Task(PythonOperator): read_task>
[2024-01-26, 20:11:19 UTC] {logging_mixin.py:188} INFO - do_another_job
[2024-01-26, 20:11:19 UTC] {logging_mixin.py:188} INFO - dag details: <DAG: my_id>
Использование нескольких DAG-ов в одном файле
В одном и том же python-файле файле может быть задано более одного DAG-а, а также сколько угодно конфигураций. Именно поэтому мы явно прописываем, какую конфигурацию использовать каждому DAG-у (default_args=
), а также какие DAG-и использовать в тасках (dag=dag2
).
Код ниже является не самым очевидным, так как названия переменных совпадают для разных дагов:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 |
|
На уровне UI Airflow после импорта данного py-скрипта в UI просто появятся две разные задачи:
Важно: запуская задачи, мы не можем взять одну задачу от одного дага и привязать её к задаче от другого. Задачи могут принадлежать только одному дагу и соответственно совместно использоваться только внутри одного дага.
Обмен сообщений через XCOM
Даг может состоять из большого количества задач, каждая задача имеет своё уникальное название:
read_task = PythonOperator(...
print(f"dag details: {read_task}")
{logging_mixin.py:188} INFO - dag details: <Task(PythonOperator): read_task>
Более того, мы можем просмотреть все "переменные" Dag
-а:
print(f"dag details: {dag.task_dict}")
dag details: {
'start_task': <Task(EmptyOperator): start_task>,
'read_task': <Task(PythonOperator): read_task>,
'write_task': <Task(PythonOperator): write_task>,
'end_task': <Task(EmptyOperator): end_task>}
Как видно на последнем примере, данный даг состоит из четырёх тасок, каждый из которых выполняется самостоятельно (имеет свою логику, свои данные и никак не зависит от другой задачи).
Но бывают случаи, когда нам может понадобиться узнать результат какой-либо другой задачи и выполнять последующие действия в зависимости от этого результата. Для этих целей используется менеджер контекста XCOM
.
В таком случае во время выполнения первой задачи мы помещаем полученные данные в Metadata DB Airflow
(вызываем XCOM push). После чего мы сможем обращаться к этим данным из других задач.
Чтобы пользоваться данным способом передачи данных требуется разобраться с его синтаксисом:
- как сохранять данные
- как доставать данные
Как сохранять данные в XCOM
Сохранение данных будем осуществлять через PythonOperator:
- Потребуется создать задачу, в рамках которой будут получены и сохранены данные
- Преобразование и сохранение данных делается через Python-фунцию, в которой требуется учитывать некоторые моменты
Создадим python-функцию, котороая возвращает какое-либо значение:
from datetime import datetime, timedelta
def get_time():
time_is = str(datetime.now())
print(f"Time: {time_is}")
return time_is
Создадим задачу, которая вызывает эту функцию:
t1 = PythonOperator(
task_id='get_time',
python_callable=get_time
)
Напишем минимальный DAG
с этой информацией:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
|
В этом примере мы не пользовались никакими XCOM
-ами, но в UI видно, что таска возвращает значение time_is
:
Такое поведение объясняется наличием return
значения в python-функции. При использовании опреатора PythonOperator совместно с функцией return значение автоматически сохраняется в Metadata DB Airflow
.
Как читать данные из XCOM
Создадим вторую функцию, которая будет вычитывать это значение из первой задачи и в зависимости от него выводить результат:
1 2 3 4 5 6 7 8 9 10 11 12 |
|
Что здесь важно:
В задаче t2 мы сказали, чтобы в функцию передавался контекст: provide_context=True
. А в самой функции добавили обработку именованных аргументов print_result(**context)
.
Какое слово использовать, не имеет значения. Кто-то пишет kwargs, кто-то context. Важно, чтобы этот аргумент начинался с **любое_название
.
Слово context
используется не просто так. Даже в задаче параметр, отвечающий за передачу контекста называется provide_context
. Поэтому будем использовать именно это слово.
Если вы выполните этот код и взгляните в логах на вывод строки print(f"context: {context}")
, то увидите огромную простыню информации:
[2024-01-27T02:52:29.632+0300] {logging_mixin.py:188} INFO - context:
{'conf': <airflow.configuration.AirflowConfigParser object at 0x7f04b5b63fd0>,
'dag': <DAG: xcom_minimal>,
'dag_run': <DagRun xcom_minimal @ 2024-01-26 23:52:24.792984+00:00: manual__2024-01-26T23:52:24.792984+00:00
...
То есть это информация, передавамая в связи с вызовом данной задачи, является контекстом этой задачи (то есть содержит описание всех деталей, относящихся к задаче). Один из перечисленных там параметров - это 'ti':
'task': <Task(PythonOperator): print_result>,
'task_instance': <TaskInstance: xcom_minimal.print_result manual__2024-01-26T23:52:24.792984+00:00 [running]>,
'task_instance_key_str': 'xcom_minimal__print_result__20240126',
'test_mode': False,
'ti': <TaskInstance: xcom_minimal.print_result manual__2024-01-26T23:52:24.792984+00:00 [running]>
На момент исполнения задачи этот параметр содержит:
- информацию о названии таска:
`TaskInstance: xcom_minimal.print_result
- как и когда он запущен (вручную):
manual__2024-01-26
- его статус:
running
Эта информация дана для общего ознакомления, чтобы объяснить, что существует контекст, который представляет из себя словарь и у которого имеется ключ ti
.
Как работает передача данных через XCOM
Как и почему мы можем получать данные из других задач через этот ключ? Данный ключ может использоваться и используется XCOM
-ом при необходимости передачи информации.
Чтобы подключиться к XCOM
-у, нам нужен контекст и мы активируем его передачу внутри таска:
1 2 |
|
Контекст - это словарь, чтобы работать с ним в функции, включаем приём именованных аргументов:
def print_result(**context):
Чтобы получить возможность взаимодейстовавать с менеджером XCOM
(неважно для чтения или записи), создаём переменную:
tsk_inst = context['ti']
После чего мы обращаемся к XCOM
и вычитываем необходимое значение по id-таска, в рамках которого оно было туда сохранено.
res = tsk_inst.xcom_pull(task_ids='get_time')
Другими словами: мы лезем в Airflow DB и по названию задачи достаём оттуда значение, сохраненное после выполнения задачи.
Рассмотрим финальный пример:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
|
- Мы создали три задачи: t1, t2, t3
- Мы запустили их последовательно:
t1 >> t2 >> t3
- В первой задаче (а точнее в функции
get_time
) за счёт использованияreturn
мы сохранили результат в Metadata базе данных) - Во второй задаче мы проверили результат и сохранили его в XCOM за счёт использования
return
- В третьем таске просто дополнительно распечатали результат обеих задач
#task 1
INFO - Time: 2024-01-27 03:48:13
#task 2
INFO - res: 2024-01-27 03:48:13
INFO - Корректная длина данных
#task 3
INFO - xcom_pull 'get_time': 2024-01-27 03:48:13
INFO - xcom_pull 'print_result': Ok
PythonOperator
- способы сохранения и чтения результата в XCOM
Передача одного значения с использованием return
Один из способов - с использованием функции return
- был уже показан выше:
- создаётся task типа
PythonOperator
, который вызывает python-функцию - сама функция возвращает значение через return
from datetime import datetime, timedelta
def get_time():
time_is = str(datetime.now())
print(f"Time: {time_is}")
return time_is
t1 = PythonOperator(
task_id='get_time',
python_callable=get_time
)
Чтобы прочитать это значение:
- мы создаём второй task, из-под которого также вызываем Python-функцию
- в таске обязательно передаём контекст в вызываемую функцию
- внутри функции вычитываем значение по имени того таска, внутри которого сохранялось это значение
1 2 3 4 5 6 7 8 9 10 11 12 |
|
Передача нескольких значений с использованием return
Если требуется передать несколько значений, то ничто не мешает вернуть значение, напр., в виде списка, словаря или какого-либо другого типа данных.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
|
После чего можно вычитать значение, используя синтаксис используемого типа данных:
result: ['значение 1', 'значение 2', 'значение 3']
2-й элемент списка: значение 2
Передача одного значения с использованием XCOM_push
При передаче значения через xcom_push
необходимо помнить, что и в функцию на запись и в функцию на чтение требуется передавать контекст.
from airflow.operators.python import PythonOperator
from airflow import DAG
from datetime import datetime, timedelta
args = {
'owner': 'dao2',
'start_date': datetime.now() - timedelta(days=1),
}
with DAG (
dag_id="return_py_xcom",
default_args=args,
schedule_interval='1 */1 * * *',
catchup=False,
tags=["dao2", "return", "py", "xcom"],
) as dag:
def save_value_over_xcom(**context):
tsk_inst = context['ti']
tsk_inst.xcom_push(key="save_value_over_xcom", value="Сохранено через XCOM")
def print_result(**context):
tsk_inst = context['ti']
result = tsk_inst.xcom_pull(key="save_value_over_xcom", task_ids='save_value_over_xcom')
print(f"result: {result}")
t1 = PythonOperator(
task_id='save_value_over_xcom',
python_callable=save_value_over_xcom,
provide_context=True,
)
t2 = PythonOperator(
task_id='print_result',
python_callable=print_result,
provide_context=True,
dag=dag
)
t1 >> t2
Также - при вычитывании данных надо обязательно указывать не только задачу, из которой вычитывается значение, но и ключ, в который значение было записано:
1 2 3 4 5 |
|
Передача нескольких значений с использованием XCOM_push
Первый способ - это передавать значение в виде списка или любого другого типа данных. Описывалось в главе "Передача нескольких значений с использованием return
".
Также можно записывать требуемые значения в отдельные xcom-значения:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
|
Способы доступа к значениям XCOM
Ранее для чтения значений внутри задачи t2
всегда создавался отдельная python-функция, осуществляющая чтение. Если же требуется получить значение внутри задачи, напр., то к нему можно обращаться через синтаксис:
"{{ ti.xcom_pull(key='last_card_table', task_ids='get_last_tables') }}",
Вариант применения будет продемонстрирован далее, при использовании BashOperator
-а.
BashOperator
- способы сохранения и чтения результата в XCOM
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
|
В отличии от предыдущих примеров сохранение и обращение к XCOM-значениям тут происходит:
- внутри задач
- с использованием особого синтаксиса
Данный синтаксис можно использовать и в других операторах.